package cn.dfun.sample.flink.apitest

import java.sql.{DriverManager, PreparedStatement}
import java.{sql, util}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

/**
  * JDBC Sink测试
  */
object JdbcSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val inputPath = "C:\\wor\\flink-sample\\src\\main\\resources\\sensor"
    val inputStream= env.readTextFile(inputPath)
    // 包装成样例类
    val dataStream = inputStream
      .map(data => {
        var arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      })

    val stream = env.addSource(new MySensorSource())
    stream.addSink(new MyJdbcFunc)

    env.execute("jdbc sink test")
  }
}

class MyJdbcFunc extends RichSinkFunction[SensorReading] {
  // 定义连接 预编译语句
  var conn: sql.Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://node-01:3306/test", "root", "123")
    insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
  }

  override def invoke(value: SensorReading): Unit = {
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    if(updateStmt.getUpdateCount == 0) {
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}
